package com.uxsino.commons.db.queue;

import com.alibaba.fastjson.JSON;
import com.uxsino.commons.threads.GlobalThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

import javax.transaction.Transactional;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * @ClassName DBThreadPool
 * @Description TODO
 * @Author <a href="mailto:royrxc@gmail.com">Ran</a>
 * @Daate 2020/3/23 15:26
 **/
@Component
public class DBThreadPool {

    private static final ConcurrentHashMap<String, Future<?>> CONSUMERS = new ConcurrentHashMap<>();
    private static final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
    private static final Logger LOG  = LoggerFactory.getLogger(DBThreadPool.class);
    private static final Executor executor = GlobalThreadPool.getPool("DB_THREAD_EXECUTOR", 200, 65535, 10 * 1000, 2048);

    private static final int MAX_WAIT_TIMEOUT_MS = 5*60*1000;

    @Autowired
    DBQueue queue;


    static  {
        scheduler.setPoolSize(Runtime.getRuntime().availableProcessors() * 500);
        scheduler.setThreadGroupName("DB_SCHEDULE_SCHEDULER");
        scheduler.initialize();
        scheduler.getScheduledThreadPoolExecutor().setKeepAliveTime(60, TimeUnit.SECONDS);
        scheduler.getScheduledThreadPoolExecutor().allowCoreThreadTimeOut(true);
    }

    /**
     * b
     * @param key：队列标识
     * @param consumer： 消费者
     * @param converter, 转换器，转换为需要的实体
     * @param cn: concurrent number, 并发数
     * @param <T>
     */
    public <T> void registe(String key, Consumer<T> consumer, Function<String, T> converter, int cn){
        synchronized (CONSUMERS){
            Future<?> future = CONSUMERS.remove(key);
            if(future != null){
                future.cancel(false);
            }
            future = scheduler.scheduleWithFixedDelay(()->{
                int threads = Math.max(1, cn);
                long dataSize =  queue.size(key);//.poll(key, threads);
                if(dataSize == 0){
                    try {
                        queue.wait(key, MAX_WAIT_TIMEOUT_MS);
                    }catch (Exception e){}
                    return;
                }
                CountDownLatch latch = new CountDownLatch(threads);
                Stream.iterate(0, i->i++).limit(threads).forEach(i->{
                    executor.execute(()->{
                        try {
                            List<String> data = null;
                            int pageSize = Math.max(10, threads);
                            while (latch.getCount() == threads && !(data = queue.poll(key, pageSize)).isEmpty()){
                                data.forEach(itm->{
                                    try {
                                        consumer.accept(converter.apply(itm));
                                    }catch (Exception e){
                                        LOG.warn("[db queue consumer {} error]->{}", key, e);
                                    }
                                });
                            }
                        }finally {
                            latch.countDown();
                        }
                    });
                });
                try {
                    latch.await();
                }catch (Exception e){
                }
            }, 10L);
            CONSUMERS.put(key, future);
        }
        LOG.info("DBThreadpool CONSUMERS size: {}", CONSUMERS.size());
    }

    public void unRegiste(String key){
        synchronized (CONSUMERS){
            Future<?> future = CONSUMERS.remove(key);
            if(future != null){
                future.cancel(false);
            }
        }
    }

    public void push(Collection<Object> data, String name){
        if(data == null || data.isEmpty()){
            return;
        }
        push(name, data.stream().map(JSON::toJSONString).collect(Collectors.toList()));
    }

    public void push(String name, Collection<String> data){
        queue.push(name, data);
    }

    @Transactional
    public void clean(String name){
        queue.clean(name);
    }



    public Long size(String queueName){
        return queue.size(queueName);
    }

    public Long size(){
        return queue.size();
    }

}
